/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.fone.flumeExt.sink.L2HSink;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;

//import java.util.concurrent.ExecutorService;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
//import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SystemClock;
//import org.apache.flume.instrumentation.SinkCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fone.flumeExt.sink.FileOutputStreamExt;

// TODO: Auto-generated Javadoc
//import com.fone.flume.sink.localFile.LocalToHDFSSink.WriterCallback;

/**
 * <p>Title: LocalFileWrite.java</p>
 * The Class LocalFileWrite.
 * <p>Description: </p>
 *
 * @author Phoenics Chow
 * @version 1.0
 * @date 2014-6-3
 */
public class LocalFileWrite {
	
	/** The Constant LOG. */
	private static final Logger LOG = LoggerFactory.getLogger(LocalFileWrite.class);
	
	/** The roll interval. */
	private long rollInterval;
	
	/** The roll size. */
	private long rollSize;
	
	/** The roll count. */
	private long rollCount;
	
	/** The batch size. */
	private long batchSize;
	// private Context context;
	/** The file path. */
	private String filePath;
	
	/** The file name. */
	private String fileName;
	
	/** The in use prefix. */
	private String inUsePrefix;
	
	/** The in use suffix. */
	private String inUseSuffix;
	
	/** The file suffix. */
	private String fileSuffix;
	
	/** The timed roller pool. */
	private ScheduledExecutorService timedRollerPool;
	// private SinkCounter sinkCounter;
	// private int idleTimeout;
	// private WriterCallback onIdleCallback;
	// private String onIdleCallbackPath;
	// private long callTimeout;
	// private ExecutorService callTimeoutPool;

	/** The output stream. */
	private FileOutputStreamExt outputStream;
	
	/** The clock. */
	private Clock clock = new SystemClock();

	/** The bucket path. */
	private String bucketPath;
	
	/** The timed roll future. */
	private ScheduledFuture<Void> timedRollFuture;

	/** The event counter. */
	private long eventCounter;
	
	/** The process size. */
	private long processSize;
	
	/** The batch counter. */
	private long batchCounter;
	
	/** The is open. */
	private boolean isOpen = false;
	
	

	// private boolean isClose_ = false;
	/**
	 * Instantiates a new local file write.
	 *
	 * @param rollInterval the roll interval
	 * @param rollSize the roll size
	 * @param rollCount the roll count
	 * @param batchSize the batch size
	 * @param context the context
	 * @param filePath the file path
	 * @param fileName the file name
	 * @param inUsePrefix the in use prefix
	 * @param inUseSuffix the in use suffix
	 * @param fileSuffix the file suffix
	 * @param timedRollerPool the timed roller pool
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public LocalFileWrite(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName,
			String inUsePrefix, String inUseSuffix, String fileSuffix, ScheduledExecutorService timedRollerPool) throws IOException {
		this.rollInterval = rollInterval;
		this.rollSize = rollSize;
		this.rollCount = rollCount;
		this.batchSize = batchSize;
		// this.context = context;
		this.filePath = filePath;
		this.fileName = fileName;
		this.inUsePrefix = inUsePrefix;
		this.inUseSuffix = inUseSuffix;
		this.fileSuffix = fileSuffix;
		this.timedRollerPool = timedRollerPool;
		// this.sinkCounter = sinkCounter;
		// this.idleTimeout = idleTimeout;
		// this.onIdleCallback = onIdleCallback;
		open();
	}

	// private OutputStream outputStream = new BufferedOutputStream(new
	// FileOutputStream(currentFile));

	/**
	 * Open.
	 *
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	private void open() throws IOException {
		if ((filePath == null)) {
			throw new IOException("Invalid file settings");
		}
		this.batchCounter = 0;
		long counter = new AtomicLong(clock.currentTimeMillis()).get();
		String fullFileName = fileName + "." + counter;
		fullFileName += fileSuffix;
		bucketPath = filePath + "/" + inUsePrefix + fullFileName + inUseSuffix;
		String targetPath = filePath + "/" + fullFileName + ".tmp";
		outputStream = new FileOutputStreamExt(targetPath, true);
		LOG.info("open file {}", targetPath);
		resetCounters();
		if (rollInterval > 0) {
			Callable<Void> action = new Callable<Void>() {
				public Void call() throws Exception {
					LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", bucketPath, rollInterval);
					try {
						close();
					} catch (Throwable t) {
						LOG.error("Unexpected error", t);
					}
					return null;
				}
			};
			timedRollFuture = timedRollerPool.schedule(action, rollInterval, TimeUnit.SECONDS);
		}
		isOpen = true;
	}

	/**
	 * Flush.
	 *
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public synchronized void flush() throws IOException {
		outputStream.flush();
		batchCounter = 0;
	}

	/**
	 * Close.
	 *
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public synchronized void close() throws IOException {
		outputStream.close();
		File file = outputStream.getFile();
		LOG.info("file ({}): closed.", file.getPath());
		String filefullName = file.getPath();
		if (filefullName.trim().endsWith(".tmp")) {
			filefullName = filefullName.substring(0, filefullName.length() - 4);
			File fullName = new File(filefullName);
			if (file.renameTo(fullName)) {
				LOG.info("file {}  rename to {}  .", file.getPath(), filefullName);
			}
		}
		isOpen = false;
		this.batchCounter = 0;
		if (timedRollFuture != null && !timedRollFuture.isDone()) {
			timedRollFuture.cancel(false); // do not cancel myself if running!
			timedRollFuture = null;
		}
	}

	/**
	 * Append.
	 *
	 * @param event the event
	 * @throws IOException Signals that an I/O exception has occurred.
	 */
	public synchronized void append(Event event) throws IOException {
		if (!isOpen) {
			open();
		}
	
		outputStream.write(event.getBody());
		processSize += event.getBody().length;
		eventCounter++;
		batchCounter++;
		if (batchCounter >= batchSize) {
			flush();
		}
		if (shouldRotate()) {
			flush();
			close();
		}
	}

	/**
	 * Should rotate.
	 *
	 * @return true, if successful
	 */
	private boolean shouldRotate() {
		boolean doRotate = false;
		if ((rollCount > 0) && (rollCount <= eventCounter)) {
			LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
			doRotate = true;
		}

		if ((rollSize > 0) && (rollSize <= processSize)) {
			LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
			doRotate = true;
		}

		return doRotate;
	}

	/**
	 * Reset counters.
	 */
	private void resetCounters() {
		eventCounter = 0;
		processSize = 0;
		batchCounter = 0;
	}

	/**
	 * Gets the local path.
	 *
	 * @return the local path
	 */
	public String getLocalPath() {
		return bucketPath;
	}
}
